package com.duobaoyu.niodemo;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Date;
import java.util.Iterator;
import java.util.Set;

/**
 * 1.客户端发起的连接操作是异步的，可以通过在多路复用器注册op_connect等待后续结果，
 * 不需要像之前的客户端那样被同步阻塞
 * 2.socketChannel 的读写操作都是异步的，如果没有可读写的数据它不会同步等待，
 * 直接返回，这样i/o通信线程就可以处理其他的链路，不需要同步等待这个链路可用。
 * 3.线程模型的优化：由于jdk的Selector在linux等主流操作系统上通过 epoll 实现，
 * 它没有连接句柄数的限制（只受限于操作系统的最大句柄数或者对单个线程的句柄限制），
 * 这意味着一个Selector线程可以同时处理成千上万个客户端连接，而且性能不会随着客户端的增加而线性下降。
 *
 *
 *
 * @author guiguan
 * @date 2019/9/20 14:40
 */
public class MultiplexerTimeServer implements Runnable {

    private Selector selector;

    private ServerSocketChannel serverSocketChannel;

    private volatile boolean stop;

    public MultiplexerTimeServer(int port) {
        try {
            //启用reactor线程 内部调用 SelectorProvider.provider().openSelector()
            selector = Selector.open();
            //打开ServerSocketChannel 用于监听客户端的链接，它是所有客户端链接的父管道
            serverSocketChannel = ServerSocketChannel.open();
            //设置为非阻塞模式
            serverSocketChannel.configureBlocking(false);
            //绑定服务端地址
            serverSocketChannel.socket().bind(new InetSocketAddress(port), 1024);
            //将channel注册到reactor线程的多路复用器 selector上，监听accept事件
            //accept事件与socket的accept概念相同，即接收客户端链接
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("The time server is start in port" + port);
        } catch (Exception e) {
            e.printStackTrace();
            //system.exit（0）:正常退出，程序正常执行结束退出
            //system.exit (1) :是非正常退出，就是说无论程序正在执行与否，都退出
            System.exit(1);
        }
    }


    public void stop() {
        this.stop = true;
    }


    @Override
    public void run() {
        while (!stop) {
            try {
                //去linux 内核去拿事件消息
                //设置阻塞时间，仅仅在有就绪的selectionKey时返回，如果没有准备就绪的key就阻塞，直到超时，注意这个时间并不准确
                selector.select(1000);
                //selectedKeys 方法是取publicSelectedKeys，publicSelectedKeys只能删除不能添加
                //且没有客户端链接时不阻塞，返回一个空集合
                Set<SelectionKey> selectedKeys = selector.selectedKeys();
                Iterator<SelectionKey> it = selectedKeys.iterator();
                SelectionKey key = null;
                while ((it.hasNext())) {
                    key = it.next();
                    it.remove();
                    try {
                        handleInput(key);
                    } catch (Exception e) {
                        key.cancel();
                        if (key.channel() != null) {
                            key.channel().close();
                        }
                    }
                }
            } catch (Throwable t) {
                t.printStackTrace();
            }
        }

        //多路复用器关闭后，所有注册在上面的channel 和 pipe 等资源都会被自动去注册并关闭，所以不需要重复释放资源
        if (selector != null) {
            try {
                selector.close();
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    }

    private void handleInput(SelectionKey key) throws IOException {
/*        判断key是否可用，之所以需要判断key是否可用，是因为SelectionKey通过调用某个键的 cancel 方法、关闭其通道，
         或者通过关闭其选择器来取消 该键之前，它一直保持有效。取消某个键不会立即从其选择器中移除它；
         相反，会将该键添加到选择器的已取消键集，以便在下一次进行选择操作时移除它。可通过调用某个键的 isValid 方法来测试其有效性*/
        if (key.isValid()) {
            //处理新接入的请求
            if (key.isAcceptable()) {
                //接收新连接 SelectionKey中存储了channel 和 selector
                ServerSocketChannel serverSocketChannel = (ServerSocketChannel) key.channel();
                SocketChannel socketChannel = serverSocketChannel.accept();
                socketChannel.configureBlocking(false);
/*                注册共分为两步：
                1.findKey 遍历所有的selectionKey，如果有key中的selector与传入的selector相同，则返回该key（该方法为同步方法）
                2.如果获取的key不为空，则将该key内的 interestOps 修改为传入的 interestOps，此处我们传入 SelectionKey.OP_READ
                attach方法不用理会，该方法为将某个特定的值与key绑定，此处绑定的是一个null值
                注：interest 集合 确定了下一次调用某个选择器的选择方法时，将测试哪类操作的准备就绪信息。
                 创建该键时使用给定的值初始化 interest 集合；之后可通过 interestOps(int) 方法对其进行更改。*/
                socketChannel.register(selector, SelectionKey.OP_READ);
            }
/*            判断是否可读，通过key内的注册事件来判断 readyOps 是否等于 SelectionKey.OP_READ 判断方法使用了&运算
            注：ready 集合 标识了这样一类操作，即某个键的选择器检测到该键的通道已为此类操作准备就绪。创建该键时 ready 集合被初始化为零；
             可以在之后的选择操作中通过选择器对其进行更新，但不能直接更新它
            注意 ready集合指示仅是一个指示，并不能保证线程执行此类别中的操作而不被阻塞。ready标识可能因为外部事件和通道上的io操作变得不准*/
            if (key.isReadable()) {
                //读取数据
                SocketChannel socketChannel = (SocketChannel) key.channel();
      /*          分配缓冲区大小 内部使用的是HeapByteBuffer
                Buffer定义了四个私有字段 mark，position，limit，capacity 且mark <= position <= limit <= capacity
                position: 写数据时当前位置，读数据不会修改position
                mark: 当缓冲区初始化时、设置新的position或limit的值时、清理缓冲区时、设置读模式时等，mark值都会被标为-1,
                 mark作为position的一个临时存储的变量而存在，我们随时都可以调用reset()把之前存储的原position值重新赋给position变量。
                 当一个position不够用时，我们需要多一个临时变量存储，这似乎就是mark存在的意义了,仅仅在使用mark()方法的时候，mark字段才会被赋予position值*/
                ByteBuffer readBuffer = ByteBuffer.allocate(1024);
         /*     缓冲区读取数据 表面上是读取流数据，但实际上是把channel的数据写到了buffer里
         *      返回值大于0： 读取到了字节，对字节进行编解码
         *      返回值等于0：没有读取到字节，属于正常场景，忽略
         *      返回值为-1：链路已经关闭，需要关闭SocketChannel，释放资源*/

                int readBytes = socketChannel.read(readBuffer);
                if (readBytes > 0) {
                   /* 切换模式 本处为写模式切换为读模式 注意该方法是线程不安全的，如果想要线程安全需要自己实现
                      写转读用flip，读转写用clean
                       flip的作用是将缓冲区当前的limit设置为position，position设置为0，用于后续对缓冲区的读取操作。
                       */
                    readBuffer.flip();
                    //获取遗留的数据
                    byte[] bytes = new byte[readBuffer.remaining()];
                    //调用ByteBuffer的get操作将缓冲区可读的字节数组赋值到新创建的字节数组中
                    readBuffer.get(bytes);
                    //利用String构造函数创建消息体
                    String body = new String(bytes, StandardCharsets.UTF_8);
                    System.out.println("The time server receive order :" + body);
                    //如果请求指令是
                    String currentTime = "Query time order".equalsIgnoreCase(body) ? new Date().toString() : "Bad order";
                    doWrite(socketChannel, currentTime);
                } else if (readBytes < 0) {
                    //关闭链路
                    key.cancel();
                    socketChannel.close();
                } else {
                    //读到0字节，忽略
                }
            }


        }
    }

    private void doWrite(SocketChannel socketChannel, String response) throws IOException {
        if (response != null && response.trim().length() > 0) {
            byte[] bytes = response.getBytes();
            //开辟一个新的buffer
            ByteBuffer writeBuffer = ByteBuffer.allocate(bytes.length);
            writeBuffer.put(bytes);
            //此处之所以不用clear，是因为这是一个全新的buffer
            writeBuffer.flip();
            writeBuffer.compact();
           /* 读取这个buffer到channel内，然后通过channel写出
           * 需要注意，socketChannel是异步非阻塞的，并不能保证一次能够把需要发送的字节数组发送完，
           * 此时会发生写半包的问题，需要注册写操作，不断轮训selector将没有发送完的ByteBuffer发送完毕，
           * 可以通过ByteBuffer的hasRemaining()方法判断消息是否发送完成 */
            socketChannel.write(writeBuffer);
        }

    }


}
